// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
	"context"
	"fmt"
	"math/rand"
	"strconv"
	"testing"

	"github.com/pingcap/failpoint"
	"github.com/pingcap/tidb/pkg/ddl"
	"github.com/pingcap/tidb/pkg/domain"
	"github.com/pingcap/tidb/pkg/executor"
	"github.com/pingcap/tidb/pkg/kv"
	"github.com/pingcap/tidb/pkg/meta"
	"github.com/pingcap/tidb/pkg/meta/model"
	"github.com/pingcap/tidb/pkg/parser"
	"github.com/pingcap/tidb/pkg/parser/ast"
	"github.com/pingcap/tidb/pkg/session/sessionapi"
	"github.com/pingcap/tidb/pkg/sessionctx"
	"github.com/pingcap/tidb/pkg/sessionctx/variable"
	"github.com/pingcap/tidb/pkg/testkit"
	"github.com/stretchr/testify/require"
)

// batch create table with table id reused
func TestSplitBatchCreateTableWithTableId(t *testing.T) {
	store, dom := testkit.CreateMockStoreAndDomain(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("drop table if exists table_id_reused1")
	tk.MustExec("drop table if exists table_id_reused2")
	tk.MustExec("drop table if exists table_id_new")

	d := dom.DDL()
	require.NotNil(t, d)

	infos1 := []*model.TableInfo{}
	infos1 = append(infos1, &model.TableInfo{
		ID:   124,
		Name: ast.NewCIStr("table_id_reused1"),
	})
	infos1 = append(infos1, &model.TableInfo{
		ID:   125,
		Name: ast.NewCIStr("table_id_reused2"),
	})
	querys1 := []string{
		"create table test.table_id_reused1 (id int)",
		"create table test.table_id_reused2 (id int)",
	}

	sctx := tk.Session()

	// keep/reused table id verification
	sctx.SetValue(sessionctx.QueryString, "TODO")
	err := executor.SplitBatchCreateTableForTest(sctx, ast.NewCIStr("test"), infos1, querys1, ddl.WithIDAllocated(true))
	require.NoError(t, err)
	require.Equal(t, "create table test.table_id_reused1 (id int);create table test.table_id_reused2 (id int);", sctx.Value(sessionctx.QueryString))

	tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_reused1'").
		Check(testkit.Rows("124"))
	tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_reused2'").
		Check(testkit.Rows("125"))
	ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)

	// allocate new table id verification
	// query the global id
	var id int64
	err = kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
		m := meta.NewMutator(txn)
		var err error
		id, err = m.GenGlobalID()
		return err
	})

	require.NoError(t, err)

	infos2 := []*model.TableInfo{}
	infos2 = append(infos2, &model.TableInfo{
		ID:   124,
		Name: ast.NewCIStr("table_id_new"),
	})
	querys2 := []string{"create table test.table_id_new (id int)"}

	tk.Session().SetValue(sessionctx.QueryString, "TODO")
	err = executor.SplitBatchCreateTableForTest(sctx, ast.NewCIStr("test"), infos2, querys2)
	require.NoError(t, err)
	require.Equal(t, "create table test.table_id_new (id int);", sctx.Value(sessionctx.QueryString))

	idGen, ok := tk.MustQuery(
		"select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").
		Rows()[0][0].(string)
	require.True(t, ok)
	idGenNum, err := strconv.ParseInt(idGen, 10, 64)
	require.NoError(t, err)
	require.Greater(t, idGenNum, id)

	// a empty table info with len(info3) = 0
	infos3 := []*model.TableInfo{}
	querys3 := []string{}

	err = executor.SplitBatchCreateTableForTest(sctx, ast.NewCIStr("test"), infos3, querys3, ddl.WithIDAllocated(true))
	require.NoError(t, err)
	require.Equal(t, "", sctx.Value(sessionctx.QueryString))
}

// batch create table with table id reused
func TestSplitBatchCreateTable(t *testing.T) {
	store, dom := testkit.CreateMockStoreAndDomain(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("drop table if exists table_1")
	tk.MustExec("drop table if exists table_2")
	tk.MustExec("drop table if exists table_3")

	d := dom.DDL()
	require.NotNil(t, d)

	infos := []*model.TableInfo{}
	infos = append(infos, &model.TableInfo{
		ID:   1234,
		Name: ast.NewCIStr("tables_1"),
	})
	infos = append(infos, &model.TableInfo{
		ID:   1235,
		Name: ast.NewCIStr("tables_2"),
	})
	infos = append(infos, &model.TableInfo{
		ID:   1236,
		Name: ast.NewCIStr("tables_3"),
	})
	querys := []string{
		"create table test.tables_1 (id int)",
		"create table test.tables_2 (id int)",
		"create table test.tables_3 (id int)"}

	sctx := tk.Session()

	// keep/reused table id verification
	tk.Session().SetValue(sessionctx.QueryString, "TODO")
	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(1)"))
	err := executor.SplitBatchCreateTableForTest(sctx, ast.NewCIStr("test"), infos, querys, ddl.WithIDAllocated(true))
	require.NoError(t, err)
	require.Equal(t, "create table test.tables_3 (id int);", sctx.Value(sessionctx.QueryString))

	tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3"))
	jobs := tk.MustQuery("admin show ddl jobs").Rows()
	require.Greater(t, len(jobs), 3)
	// check table_1
	job1 := jobs[0]
	require.Equal(t, "test", job1[1])
	require.Equal(t, "tables_3", job1[2])
	require.Equal(t, "create tables", job1[3])
	require.Equal(t, "public", job1[4])

	// check table_2
	job2 := jobs[1]
	require.Equal(t, "test", job2[1])
	require.Equal(t, "tables_2", job2[2])
	require.Equal(t, "create tables", job2[3])
	require.Equal(t, "public", job2[4])

	// check table_3
	job3 := jobs[2]
	require.Equal(t, "test", job3[1])
	require.Equal(t, "tables_1", job3[2])
	require.Equal(t, "create tables", job3[3])
	require.Equal(t, "public", job3[4])

	// check reused table id
	tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").
		Check(testkit.Rows("1234"))
	tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").
		Check(testkit.Rows("1235"))
	tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").
		Check(testkit.Rows("1236"))

	require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge"))
}

// batch create table with table id reused
func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) {
	store, dom := testkit.CreateMockStoreAndDomain(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("drop table if exists table_1")
	tk.MustExec("drop table if exists table_2")
	tk.MustExec("drop table if exists table_3")

	d := dom.DDL()
	require.NotNil(t, d)

	infos := []*model.TableInfo{}
	infos = append(infos, &model.TableInfo{
		Name: ast.NewCIStr("tables_1"),
	})
	infos = append(infos, &model.TableInfo{
		Name: ast.NewCIStr("tables_2"),
	})
	infos = append(infos, &model.TableInfo{
		Name: ast.NewCIStr("tables_3"),
	})
	querys := []string{
		"create table test.tables_1 (id int)",
		"create table test.tables_2 (id int)",
		"create table test.tables_3 (id int)"}

	sctx := tk.Session()

	tk.Session().SetValue(sessionctx.QueryString, "TODO")
	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge", "return(0)"))
	err := executor.SplitBatchCreateTableForTest(sctx, ast.NewCIStr("test"), infos, querys)
	require.Equal(t, "create table test.tables_1 (id int);", sctx.Value(sessionctx.QueryString))
	require.True(t, kv.ErrEntryTooLarge.Equal(err))

	require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/RestoreBatchCreateTableEntryTooLarge"))
}

func TestBRIECreateDatabase(t *testing.T) {
	store, dom := testkit.CreateMockStoreAndDomain(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("drop database if exists db_1")
	tk.MustExec("drop database if exists db_1")

	d := dom.DDL()
	require.NotNil(t, d)

	sctx := tk.Session()
	originQueryString := sctx.Value(sessionctx.QueryString)
	schema1 := &model.DBInfo{
		ID:      1230,
		Name:    ast.NewCIStr("db_1"),
		Charset: "utf8mb4",
		Collate: "utf8mb4_bin",
		State:   model.StatePublic,
	}
	err := executor.BRIECreateDatabase(sctx, schema1, "/* from test */")
	require.NoError(t, err)

	schema2 := &model.DBInfo{
		ID:      1240,
		Name:    ast.NewCIStr("db_2"),
		Charset: "utf8mb4",
		Collate: "utf8mb4_bin",
		State:   model.StatePublic,
	}
	err = executor.BRIECreateDatabase(sctx, schema2, "")
	require.NoError(t, err)
	require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString))
	tk.MustExec("use db_1")
	tk.MustExec("use db_2")
}

func mockTableInfo(t *testing.T, sctx sessionctx.Context, createSQL string) *model.TableInfo {
	node, err := parser.New().ParseOneStmt(createSQL, "", "")
	require.NoError(t, err)
	info, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1)
	require.NoError(t, err)
	info.State = model.StatePublic
	return info
}

func TestBRIECreateTable(t *testing.T) {
	store, dom := testkit.CreateMockStoreAndDomain(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tk.MustExec("drop table if exists table_1")
	tk.MustExec("drop table if exists table_2")

	d := dom.DDL()
	require.NotNil(t, d)

	sctx := tk.Session()
	originQueryString := sctx.Value(sessionctx.QueryString)
	dbName := ast.NewCIStr("test")
	tableInfo := mockTableInfo(t, sctx, "create table test.table_1 (a int primary key, b json, c varchar(20))")
	tableInfo.ID = 1230
	err := executor.BRIECreateTable(sctx, dbName, tableInfo, "/* from test */")
	require.NoError(t, err)

	tableInfo.ID = 1240
	tableInfo.Name = ast.NewCIStr("table_2")
	err = executor.BRIECreateTable(sctx, dbName, tableInfo, "")
	require.NoError(t, err)
	require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString))
	tk.MustExec("desc table_1")
	tk.MustExec("desc table_2")
}

func TestBRIECreateTables(t *testing.T) {
	store, dom := testkit.CreateMockStoreAndDomain(t)
	tk := testkit.NewTestKit(t, store)
	tk.MustExec("use test")
	tableInfos := make([]*model.TableInfo, 100)
	for i := range tableInfos {
		tk.MustExec(fmt.Sprintf("drop table if exists table_%d", i))
	}

	d := dom.DDL()
	require.NotNil(t, d)

	sctx := tk.Session()
	originQueryString := sctx.Value(sessionctx.QueryString)
	for i := range tableInfos {
		tableInfos[i] = mockTableInfo(t, sctx, fmt.Sprintf("create table test.table_%d (a int primary key, b json, c varchar(20))", i))
		tableInfos[i].ID = 1230 + int64(i)
	}
	err := executor.BRIECreateTables(sctx, map[string][]*model.TableInfo{"test": tableInfos}, "/* from test */")
	require.NoError(t, err)

	require.Equal(t, originQueryString, sctx.Value(sessionctx.QueryString))
	for i := range tableInfos {
		tk.MustExec(fmt.Sprintf("desc table_%d", i))
	}
}

type fakeDDLExecutor struct {
	ddl.Executor
	queryList        []string
	successQueryList []string
	maxCount         int
}

func (f *fakeDDLExecutor) BatchCreateTableWithInfo(
	sctx sessionctx.Context,
	schema ast.CIStr,
	info []*model.TableInfo,
	cs ...ddl.CreateTableOption,
) error {
	f.queryList = append(f.queryList, sctx.Value(sessionctx.QueryString).(string))
	if len(info) > f.maxCount {
		switch rand.Int() % 2 {
		case 0:
			return kv.ErrTxnTooLarge
		case 1:
			return kv.ErrEntryTooLarge
		}
	}
	f.successQueryList = append(f.successQueryList, sctx.Value(sessionctx.QueryString).(string))
	return nil
}

type fakeSessionContext struct {
	sessionapi.Session
	ddlexecutor *fakeDDLExecutor
	values      map[string]any
	vars        *variable.SessionVars
}

func newFakeSessionContext(ddlexecutor *fakeDDLExecutor) *fakeSessionContext {
	return &fakeSessionContext{
		ddlexecutor: ddlexecutor,
		values:      make(map[string]any),
		vars:        &variable.SessionVars{},
	}
}

func (f *fakeSessionContext) GetDomain() any {
	dom := &domain.Domain{}
	dom.SetDDL(nil, f.ddlexecutor)
	return dom
}

func (f *fakeSessionContext) Value(key fmt.Stringer) any {
	return f.values[key.String()]
}

func (f *fakeSessionContext) SetValue(key fmt.Stringer, value any) {
	f.values[key.String()] = value
}

func (f *fakeSessionContext) GetSessionVars() *variable.SessionVars {
	return f.vars
}

func TestSplitTablesQueryMatch(t *testing.T) {
	ddlexecutor := &fakeDDLExecutor{maxCount: 1}
	sctx := newFakeSessionContext(ddlexecutor)
	tables := map[string][]*model.TableInfo{
		"test": {
			{Name: ast.NewCIStr("t1")},
			{Name: ast.NewCIStr("t2")},
		},
		"test2": {
			{Name: ast.NewCIStr("t3")},
		},
	}

	err := executor.BRIECreateTables(sctx, tables, "/*from(br)*/")
	require.NoError(t, err)
	require.Len(t, ddlexecutor.queryList, 4)
	require.Equal(t, "/*from(br)*/CREATE TABLE `t1` (\n\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;/*from(br)*/CREATE TABLE `t2` (\n\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;", ddlexecutor.queryList[0])
	require.Equal(t, "/*from(br)*/CREATE TABLE `t1` (\n\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;", ddlexecutor.queryList[1])
	require.Equal(t, "/*from(br)*/CREATE TABLE `t2` (\n\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;", ddlexecutor.queryList[2])
	require.Equal(t, "/*from(br)*/CREATE TABLE `t3` (\n\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;", ddlexecutor.queryList[3])
	require.Len(t, ddlexecutor.successQueryList, 3)
	require.Equal(t, ddlexecutor.queryList[1], ddlexecutor.successQueryList[0])
	require.Equal(t, ddlexecutor.queryList[2], ddlexecutor.successQueryList[1])
	require.Equal(t, ddlexecutor.queryList[3], ddlexecutor.successQueryList[2])
}
